package responsemanager

import (
	"context"
	"errors"
	"math"
	"time"

	logging "github.com/ipfs/go-log"
	"github.com/ipfs/go-peertaskqueue/peertask"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-core/peer"

	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/ipldutil"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/notifications"
	"github.com/ipfs/go-graphsync/responsemanager/hooks"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
)

var log = logging.Logger("graphsync")

const (
	maxInProcessRequests = 6
	thawSpeed            = time.Millisecond * 100
)

type inProgressResponseStatus struct {
	ctx        context.Context
	cancelFn   func()
	request    gsmsg.GraphSyncRequest
	loader     ipld.Loader
	traverser  ipldutil.Traverser
	signals    signals
	updates    []gsmsg.GraphSyncRequest
	isPaused   bool
	subscriber notifications.MappableSubscriber
}

type responseKey struct {
	p         peer.ID
	requestID graphsync.RequestID
}

type signals struct {
	pauseSignal  chan struct{}
	updateSignal chan struct{}
	errSignal    chan error
}

type responseTaskData struct {
	empty      bool
	subscriber notifications.MappableSubscriber
	ctx        context.Context
	request    gsmsg.GraphSyncRequest
	loader     ipld.Loader
	traverser  ipldutil.Traverser
	signals    signals
}

// QueryQueue is an interface that can receive new selector query tasks
// and prioritize them as needed, and pop them off later
type QueryQueue interface {
	PushTasks(to peer.ID, tasks ...peertask.Task)
	PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int)
	Remove(topic peertask.Topic, p peer.ID)
	TasksDone(to peer.ID, tasks ...*peertask.Task)
	ThawRound()
}

// RequestHooks is an interface for processing request hooks
type RequestHooks interface {
	ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}

// BlockHooks is an interface for processing block hooks
type BlockHooks interface {
	ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) hooks.BlockResult
}

// UpdateHooks is an interface for processing update hooks
type UpdateHooks interface {
	ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}

// CompletedListeners is an interface for notifying listeners that responses are complete
type CompletedListeners interface {
	NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode)
}

// CancelledListeners is an interface for notifying listeners that requestor cancelled
type CancelledListeners interface {
	NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
}

// BlockSentListeners is an interface for notifying listeners that of a block send occuring over the wire
type BlockSentListeners interface {
	NotifyBlockSentListeners(p peer.ID, request graphsync.RequestData, block graphsync.BlockData)
}

// NetworkErrorListeners is an interface for notifying listeners that an error occurred sending a data on the wire
type NetworkErrorListeners interface {
	NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error)
}

// PeerManager is an interface that returns sender interfaces for peer responses.
type PeerManager interface {
	SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender
}

type responseManagerMessage interface {
	handle(rm *ResponseManager)
}

// ResponseManager handles incoming requests from the network, initiates selector
// traversals, and transmits responses
type ResponseManager struct {
	ctx                   context.Context
	cancelFn              context.CancelFunc
	peerManager           PeerManager
	queryQueue            QueryQueue
	updateHooks           UpdateHooks
	cancelledListeners    CancelledListeners
	completedListeners    CompletedListeners
	blockSentListeners    BlockSentListeners
	networkErrorListeners NetworkErrorListeners
	messages              chan responseManagerMessage
	workSignal            chan struct{}
	qe                    *queryExecutor
	inProgressResponses   map[responseKey]*inProgressResponseStatus
}

// New creates a new response manager from the given context, loader,
// bridge to IPLD interface, peerManager, and queryQueue.
func New(ctx context.Context,
	loader ipld.Loader,
	peerManager PeerManager,
	queryQueue QueryQueue,
	requestHooks RequestHooks,
	blockHooks BlockHooks,
	updateHooks UpdateHooks,
	completedListeners CompletedListeners,
	cancelledListeners CancelledListeners,
	blockSentListeners BlockSentListeners,
	networkErrorListeners NetworkErrorListeners,
) *ResponseManager {
	ctx, cancelFn := context.WithCancel(ctx)
	messages := make(chan responseManagerMessage, 16)
	workSignal := make(chan struct{}, 1)
	qe := &queryExecutor{
		requestHooks:       requestHooks,
		blockHooks:         blockHooks,
		updateHooks:        updateHooks,
		cancelledListeners: cancelledListeners,
		peerManager:        peerManager,
		loader:             loader,
		queryQueue:         queryQueue,
		messages:           messages,
		ctx:                ctx,
		workSignal:         workSignal,
		ticker:             time.NewTicker(thawSpeed),
	}
	return &ResponseManager{
		ctx:                   ctx,
		cancelFn:              cancelFn,
		peerManager:           peerManager,
		queryQueue:            queryQueue,
		updateHooks:           updateHooks,
		cancelledListeners:    cancelledListeners,
		completedListeners:    completedListeners,
		blockSentListeners:    blockSentListeners,
		networkErrorListeners: networkErrorListeners,
		messages:              messages,
		workSignal:            workSignal,
		qe:                    qe,
		inProgressResponses:   make(map[responseKey]*inProgressResponseStatus),
	}
}

type processRequestMessage struct {
	p        peer.ID
	requests []gsmsg.GraphSyncRequest
}

// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
	select {
	case rm.messages <- &processRequestMessage{p, requests}:
	case <-rm.ctx.Done():
	case <-ctx.Done():
	}
}

type unpauseRequestMessage struct {
	p          peer.ID
	requestID  graphsync.RequestID
	response   chan error
	extensions []graphsync.ExtensionData
}

// UnpauseResponse unpauses a response that was previously paused
func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&unpauseRequestMessage{p, requestID, response, extensions}, response)
}

type pauseRequestMessage struct {
	p         peer.ID
	requestID graphsync.RequestID
	response  chan error
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&pauseRequestMessage{p, requestID, response}, response)
}

type errorRequestMessage struct {
	p         peer.ID
	requestID graphsync.RequestID
	err       error
	response  chan error
}

// CancelResponse cancels an in progress response
func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&errorRequestMessage{p, requestID, errCancelledByCommand, response}, response)
}

func (rm *ResponseManager) sendSyncMessage(message responseManagerMessage, response chan error) error {
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case rm.messages <- message:
	}
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case err := <-response:
		return err
	}
}

type synchronizeMessage struct {
	sync chan error
}

// this is a test utility method to force all messages to get processed
func (rm *ResponseManager) synchronize() {
	sync := make(chan error)
	_ = rm.sendSyncMessage(&synchronizeMessage{sync}, sync)
}

type responseDataRequest struct {
	key          responseKey
	taskDataChan chan responseTaskData
}

type finishTaskRequest struct {
	key    responseKey
	status graphsync.ResponseStatusCode
	err    error
}

type setResponseDataRequest struct {
	key       responseKey
	loader    ipld.Loader
	traverser ipldutil.Traverser
}

type responseUpdateRequest struct {
	key        responseKey
	updateChan chan []gsmsg.GraphSyncRequest
}

// Startup starts processing for the WantManager.
func (rm *ResponseManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *ResponseManager) Shutdown() {
	rm.cancelFn()
}

func (rm *ResponseManager) cleanupInProcessResponses() {
	for _, response := range rm.inProgressResponses {
		response.cancelFn()
	}
}

func (rm *ResponseManager) run() {
	defer rm.cleanupInProcessResponses()
	for i := 0; i < maxInProcessRequests; i++ {
		go rm.qe.processQueriesWorker()
	}

	for {
		select {
		case <-rm.ctx.Done():
			return
		case message := <-rm.messages:
			message.handle(rm)
		}
	}
}

func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
	response, ok := rm.inProgressResponses[key]
	if !ok {
		log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
		return
	}
	if !response.isPaused {
		response.updates = append(response.updates, update)
		select {
		case response.signals.updateSignal <- struct{}{}:
		default:
		}
		return
	}
	result := rm.updateHooks.ProcessUpdateHooks(key.p, response.request, update)
	peerResponseSender := rm.peerManager.SenderForPeer(key.p)
	err := peerResponseSender.Transaction(key.requestID, func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
		for _, extension := range result.Extensions {
			transaction.SendExtensionData(extension)
		}
		if result.Err != nil {
			transaction.FinishWithError(graphsync.RequestFailedUnknown)
			transaction.AddNotifee(notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: response.subscriber})
		}
		return nil
	})
	if err != nil {
		log.Errorf("Error processing update: %s", err)
	}
	if result.Err != nil {
		delete(rm.inProgressResponses, key)
		response.cancelFn()
		return
	}
	if result.Unpause {
		err := rm.unpauseRequest(key.p, key.requestID)
		if err != nil {
			log.Warnf("error unpausing request: %s", err.Error())
		}
	}

}

func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	key := responseKey{p, requestID}
	inProgressResponse, ok := rm.inProgressResponses[key]
	if !ok {
		return errors.New("could not find request")
	}
	if !inProgressResponse.isPaused {
		return errors.New("request is not paused")
	}
	inProgressResponse.isPaused = false
	if len(extensions) > 0 {
		peerResponseSender := rm.peerManager.SenderForPeer(key.p)
		_ = peerResponseSender.Transaction(requestID, func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
			for _, extension := range extensions {
				transaction.SendExtensionData(extension)
			}
			return nil
		})
	}
	rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1})
	select {
	case rm.workSignal <- struct{}{}:
	default:
	}
	return nil
}

func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
	key := responseKey{p, requestID}
	rm.queryQueue.Remove(key, key.p)
	response, ok := rm.inProgressResponses[key]
	if !ok {
		return errors.New("could not find request")
	}

	if response.isPaused {
		peerResponseSender := rm.peerManager.SenderForPeer(key.p)
		if isContextErr(err) {

			rm.cancelledListeners.NotifyCancelledListeners(p, response.request)
			peerResponseSender.FinishWithCancel(requestID)
		} else if err != errNetworkError {
			peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled, notifications.Notifee{Topic: graphsync.RequestCancelled, Subscriber: response.subscriber})
		}
		delete(rm.inProgressResponses, key)
		response.cancelFn()
		return nil
	}
	select {
	case response.signals.errSignal <- err:
	default:
	}
	return nil
}

func (prm *processRequestMessage) handle(rm *ResponseManager) {
	for _, request := range prm.requests {
		key := responseKey{p: prm.p, requestID: request.ID()}
		if request.IsCancel() {
			_ = rm.abortRequest(prm.p, request.ID(), ipldutil.ContextCancelError{})
			continue
		}
		if request.IsUpdate() {
			rm.processUpdate(key, request)
			continue
		}
		ctx, cancelFn := context.WithCancel(rm.ctx)
		sub := notifications.NewMappableSubscriber(&subscriber{
			p:                     key.p,
			request:               request,
			ctx:                   rm.ctx,
			messages:              rm.messages,
			blockSentListeners:    rm.blockSentListeners,
			completedListeners:    rm.completedListeners,
			networkErrorListeners: rm.networkErrorListeners,
		}, notifications.IdentityTransform)
		rm.inProgressResponses[key] =
			&inProgressResponseStatus{
				ctx:        ctx,
				cancelFn:   cancelFn,
				subscriber: sub,
				request:    request,
				signals: signals{
					pauseSignal:  make(chan struct{}, 1),
					updateSignal: make(chan struct{}, 1),
					errSignal:    make(chan error, 1),
				},
			}
		// TODO: Use a better work estimation metric.
		rm.queryQueue.PushTasks(prm.p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1})
		select {
		case rm.workSignal <- struct{}{}:
		default:
		}
	}
}

func (rdr *responseDataRequest) handle(rm *ResponseManager) {
	response, ok := rm.inProgressResponses[rdr.key]
	var taskData responseTaskData
	if ok {
		taskData = responseTaskData{false, response.subscriber, response.ctx, response.request, response.loader, response.traverser, response.signals}
	} else {
		taskData = responseTaskData{empty: true}
	}
	select {
	case <-rm.ctx.Done():
	case rdr.taskDataChan <- taskData:
	}
}

func (ftr *finishTaskRequest) handle(rm *ResponseManager) {
	response, ok := rm.inProgressResponses[ftr.key]
	if !ok {
		return
	}
	if _, ok := ftr.err.(hooks.ErrPaused); ok {
		response.isPaused = true
		return
	}
	if ftr.err != nil {
		log.Infof("response failed: %w", ftr.err)
	}
	delete(rm.inProgressResponses, ftr.key)
	response.cancelFn()
}

func (srdr *setResponseDataRequest) handle(rm *ResponseManager) {
	response, ok := rm.inProgressResponses[srdr.key]
	if !ok {
		return
	}
	response.loader = srdr.loader
	response.traverser = srdr.traverser
}

func (rur *responseUpdateRequest) handle(rm *ResponseManager) {
	response, ok := rm.inProgressResponses[rur.key]
	var updates []gsmsg.GraphSyncRequest
	if ok {
		updates = response.updates
		response.updates = nil
	} else {
		updates = nil
	}
	select {
	case <-rm.ctx.Done():
	case rur.updateChan <- updates:
	}
}

func (sm *synchronizeMessage) handle(rm *ResponseManager) {
	select {
	case <-rm.ctx.Done():
	case sm.sync <- nil:
	}
}

func (urm *unpauseRequestMessage) handle(rm *ResponseManager) {
	err := rm.unpauseRequest(urm.p, urm.requestID, urm.extensions...)
	select {
	case <-rm.ctx.Done():
	case urm.response <- err:
	}
}

func (prm *pauseRequestMessage) pauseRequest(rm *ResponseManager) error {
	key := responseKey{prm.p, prm.requestID}
	inProgressResponse, ok := rm.inProgressResponses[key]
	if !ok {
		return errors.New("could not find request")
	}
	if inProgressResponse.isPaused {
		return errors.New("request is already paused")
	}
	select {
	case inProgressResponse.signals.pauseSignal <- struct{}{}:
	default:
	}
	return nil
}

func (prm *pauseRequestMessage) handle(rm *ResponseManager) {
	err := prm.pauseRequest(rm)
	select {
	case <-rm.ctx.Done():
	case prm.response <- err:
	}
}

func (crm *errorRequestMessage) handle(rm *ResponseManager) {
	err := rm.abortRequest(crm.p, crm.requestID, crm.err)
	select {
	case <-rm.ctx.Done():
	case crm.response <- err:
	}
}
